// Copyright 2022 Huawei Cloud Computing Technology Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <array>
#include <cassert>
#include <cstring>
#include <thread>
#include <linux/prctl.h>
#include <sys/prctl.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <securec.h>
#include "../CasController.h"
#include "../CasCommon.h"
#include "CasGadget.h"
#include "CasMsg.h"
#include "CasLog.h"
#include "CasBuffer.h"
#include "CasSocket.h"
#include "CasStreamRecvParser.h"

using namespace std;

const int STREAM_MAX_BUF_LEN = 32 * 1024 * 1024;

CasStreamRecvParser *CasStreamRecvParser::g_instance = nullptr;

CasStreamRecvParser *CasStreamRecvParser::GetInstance()
{
    if (g_instance == nullptr) {
        g_instance = new (std::nothrow) CasStreamRecvParser();
        if (g_instance == nullptr) {
            ERR("Failed to new stream recv parser.");
            return nullptr;
        }
    }
    return g_instance;
}

void CasStreamRecvParser::DestroyInstance()
{
    if (g_instance != nullptr) {
        delete g_instance;
        g_instance = nullptr;
    }
}

CasStreamRecvParser::CasStreamRecvParser()
{
    m_serviceHandles.fill(nullptr);
}

CasStreamRecvParser::~CasStreamRecvParser()
{
    for (auto i = 0; i < CasMsgType::End; i++) {
        m_serviceHandles[i] = nullptr;
    }
}

void CasStreamRecvParser::SetServiceHandle(unsigned char type, CasPktHandle *serviceHandle)
{
    m_serviceHandles[type] = serviceHandle;
}

CasPktHandle *CasStreamRecvParser::GetServiceHandle(unsigned char type)
{
    return VirtualSensor >= type && type >= VirtualCamera ? m_serviceHandles[VirtualDevice] : m_serviceHandles[type];
}

CasStreamParseThread::CasStreamParseThread(CasSocket *socket, CasStreamRecvParser *streamRecvParser)
{
    this->m_streamRecvParser = streamRecvParser;
    this->m_socket = socket;
    this->m_threadStatus = CAS_THREAD_INIT;
}

CasStreamParseThread::~CasStreamParseThread()
{
    this->m_socket = nullptr;
    this->m_streamRecvParser = nullptr;
}

void HandleCompletePktMsg(CasStreamParseThread *streamParseThread, streamMsgHead *msgHead, unsigned char *recvBuf,
    unsigned int pktStartPos)
{
    if (msgHead == nullptr) {
        return;
    }
    unsigned int dataCompleteLen = msgHead->GetPayloadSize() + sizeof(stream_msg_head_t);
    CasPktHandle *serviceHandle = streamParseThread->m_streamRecvParser->GetServiceHandle(msgHead->type);
    if (serviceHandle) {
        void *pTmp = AllocBuffer(dataCompleteLen);
        if (pTmp != nullptr) {
            if (EOK != memcpy_s(pTmp, dataCompleteLen, recvBuf + pktStartPos, dataCompleteLen)) {
                ERR("Memory copy data fail.");
                return;
            }

            serviceHandle->Handle((void *)pTmp);
        }
    }
}

void RecvDataTaskEntry(CasStreamParseThread *streamParseThread)
{
    char threadName[] = "StreamRecvParser";
    prctl(PR_SET_NAME, threadName);

    while (streamParseThread->m_threadStatus == CAS_THREAD_INIT &&
        streamParseThread->m_socket->GetStatus() != SOCKET_STATUS_RUNNING) {
    }

    unsigned int receivedDataLen = 0;
    unsigned int dataCompleteLen = 0;
    bool dataCompleteFlag = true;
    unsigned char *recvBuf;
    recvBuf = (unsigned char *)malloc(STREAM_MAX_BUF_LEN);
    if (recvBuf == nullptr) {
        ERR("Malloc receive Buf failed.");
        return;
    }

    streamMsgHead *msgHead = nullptr;
    while (streamParseThread->m_threadStatus == CAS_THREAD_RUNNING) {
        int recvBytes =
            streamParseThread->m_socket->Recv(recvBuf + receivedDataLen, STREAM_MAX_BUF_LEN - receivedDataLen);
        if (recvBytes <= 0) {
            if (SOCKET_RECV_FAIL_DISCONNECT == recvBytes) {
                break;
            }
            usleep(10000);
            continue;
        }

        if (dataCompleteFlag) {
            msgHead = (streamMsgHead *)recvBuf;
            if (msgHead->type <= Invalid || msgHead->type >= End) {
                dataCompleteLen = 0;
                receivedDataLen = 0;
                dataCompleteFlag = true;
                continue;
            }
            dataCompleteLen = msgHead->GetPayloadSize() + sizeof(stream_msg_head_t);
        }

        receivedDataLen += static_cast<unsigned int>(recvBytes);
        if (receivedDataLen < dataCompleteLen) { // data not complete
            dataCompleteFlag = false;
            continue;
        } else if (receivedDataLen > dataCompleteLen) { // data beyond complete size, tcp sticky pkt
            unsigned int remainLen = receivedDataLen;
            unsigned int pktStartPos = 0;
            int index = 0;
            while (remainLen >= sizeof(stream_msg_head_t)) { // handle complete pkt in remaining data
                streamMsgHead *tmpMsgHead;
                tmpMsgHead = (streamMsgHead *)(recvBuf + pktStartPos);
                unsigned int pktLen = tmpMsgHead->GetPayloadSize() + sizeof(stream_msg_head_t);
                if (pktLen <= remainLen) {
                    HandleCompletePktMsg(streamParseThread, tmpMsgHead, recvBuf, pktStartPos);
                    remainLen -= pktLen;
                    pktStartPos += pktLen;
                    index++;
                    continue;
                } else {
                    DBG("Handle multi pkt Complete.");
                    break;
                }
            }

            // handle not complete pkt data
            if (remainLen > 0) {
                if (pktStartPos > remainLen) {
                    if (EOK != memcpy_s(recvBuf, STREAM_MAX_BUF_LEN, recvBuf + pktStartPos, remainLen)) {
                        ERR("Handle not complete pkt data, memory copy fail.");
                    }
                } else {
                    if (EOK != memmove_s(recvBuf, STREAM_MAX_BUF_LEN, recvBuf + pktStartPos, remainLen)) {
                        ERR("Handle not complete pkt data, memory move fail.");
                    }
                }
            }
            receivedDataLen = remainLen;
            dataCompleteLen = 0;
            dataCompleteFlag = true;
        } else {
            receivedDataLen = 0;
            dataCompleteLen = 0;
            dataCompleteFlag = true;
            HandleCompletePktMsg(streamParseThread, msgHead, recvBuf, 0);
        }
    }

    free(recvBuf);
    streamParseThread->m_threadStatus = CAS_THREAD_INIT;
}

int CasStreamParseThread::Start()
{
    if (this->m_threadStatus == CAS_THREAD_RUNNING) {
        return -1;
    }

    this->m_task = thread(RecvDataTaskEntry, this);
    if (this->m_task.joinable()) {
        this->m_task.detach();
    }

    this->m_threadStatus = CAS_THREAD_RUNNING;

    INFO("Thread start");
    return 0;
}

int CasStreamParseThread::Stop()
{
    if (this->m_threadStatus != CAS_THREAD_RUNNING) {
        WARN("Thread is not running.");
        return -1;
    }

    unique_lock<mutex> lck(this->m_lock);
    if (this->m_threadStatus == CAS_THREAD_RUNNING) {
        this->m_threadStatus = CAS_THREAD_EXIT;
        shutdown(m_socket->GetFd(), SHUT_RDWR);
    }

    while (this->m_threadStatus == CAS_THREAD_EXIT) {
        usleep(100);
    }
    INFO("Thread exited");
    return 0;
}
